import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class WindowFold
{



    public static void main(String[] args) throws Exception
    {
        //获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //读取数据
        DataStream<Tuple3<String,String,Integer>> input = env.fromElements(ENGLISH);

        //keyBy(0) 计算班级总成绩，下标0表示班级
        //countWindow(2) 根据元素个数对数据流进行分组切片，达到2个，触发窗口进行计算
         DataStream<String>result=input.keyBy(0).countWindow(2).fold("名单start", new FoldFunction<Tuple3<String, String, Integer>, String>() {
            @Override
            public String fold(String current, Tuple3<String, String, Integer> o) throws Exception 
            {
                return current + "-" + o.f0 + "-" + o.f1 + "-" + String.valueOf(o.f2);
            }
        });

        //输出结果
        //效果如下：
        //2> (class1,张三,130)
        result.print();

        env.execute("TestReduceFunctionOnWindow");
    }


//    ---------------------------------------------------下面是输入测试数据-----------------------------------------------------------------
    /**
     * 定义班级的三元数组
     */
    public static final Tuple3[] ENGLISH = new Tuple3[]
            {
                    //班级 姓名 成绩
                    Tuple3.of("class1","张三",100),
                    Tuple3.of("class1","李四",30),
                    Tuple3.of("class1","王五",70),
                    Tuple3.of("class2","赵六",50),
                    Tuple3.of("class2","小七",40),
                    Tuple3.of("class2","小八",10),
            };
}
